//package com.taldh.kafka.producer;
//
//import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONObject;
//import com.google.gson.Gson;
//import com.google.gson.GsonBuilder;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.kafka.core.KafkaTemplate;
//import org.springframework.kafka.support.SendResult;
//import org.springframework.stereotype.Service;
//import org.springframework.util.concurrent.ListenableFuture;
//import org.springframework.util.concurrent.ListenableFutureCallback;
//
//@Service
//public class ProducerService {
//    private static final Logger logger = LoggerFactory.getLogger(ProducerService.class);
//
//    private Gson gson = new GsonBuilder().create();
//
//    @Autowired
//    private KafkaTemplate template;
//
//    //发送消息方法
//    public void sendJson(String topic, String json) {
//        JSONObject jsonObj = JSON.parseObject(json);
//
//        jsonObj.put("topic", topic);
//        jsonObj.put("group.id", "wanda_market_kafka_group");
//        jsonObj.put("ts", System.currentTimeMillis() + "");
//
//        logger.info("json+++++++++++++++++++++  message = {}", jsonObj.toJSONString());
//
//        ListenableFuture<SendResult<String, String>> future = template.send(topic, jsonObj.toJSONString());
//        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
//            @Override
//            public void onSuccess(SendResult<String, String> result) {
//                System.out.println("msg OK." + result.toString());
//            }
//
//            @Override
//            public void onFailure(Throwable ex) {
//                System.out.println("msg send failed: ");
//            }
//        });
//    }
//}